Blockquote
REF
- KafkaConsumer API doc
- Kafka 之 Group 状态变化分析及 Rebalance 过程
- Kafka conumser redesign since 0.9.0
- 趣解reblance
consumer的新特性
在 0.9.0.0 之后的 Kafka,出现了几个新变动,一个是在 Server 端增加了 GroupCoordinator 这个角色,另一个较大的变动是将 topic 的 offset 信息由之前存储在 zookeeper 上改为存储到一个特殊的 topic 中(__consumer_offsets)。
__consumer_offsets
consumer_offsets 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 三副本,而具体 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions
来计算(其中,NumPartitions 是consumer_offsets 的 partition 数,默认是50个)的。
offset
- Last Commiteed Offset:consumer上一次commit的位置;
- Current Position:cosumer当前消费到的位置,last coomitted offset 到current position之间的就是当前正被consumer处理的消息。
- High Watermark:被成功备份到所有replicas的最新位置,该位置之前的所消息都被认为是安全可消费的。
- Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset;
coordinator机制
kafka server将partiton分配的工作转移到了Client上(Producer中也可以看到),server保留的是group的分配工作,这样的设计是为了方便client使用灵活的partition分配方案。
coordinator in server
server上的Coordinator 负责reblance、Offset提交、心跳,实现主要代码在kafka.coordinator.GroupCoordinator.scala
一个consumer group对应一个coordinator
coordinator 状态机
共有5种状态:
- Dead:group中没有成员,并且metadata已被移除,这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
- Empty:Group 没有任何成员,如果所有的 offsets 都过期的话就会变成 Dead,一般当 Group 新创建时是这个状态,也有可能这个 Group 仅仅用于 offset commits 并没有任何成员,该状态至响应
JoinGroupRequest
- Stable:这种状态下,coordinator已经获得了激活的generation,或者目前没有成员,等待第一个joinGroup。该状态还会接受成员的heartbeats。
- PreparingRebalance:准备重平衡状态,例如member发生变化
- AwaitingSync:所有的joinGroup请求都接受到后,会选举产生一个leader,这个状态就是在等待leader发送partition的分配结果(SyncGroupRequest)。
状态机如下:
coordinator in client
根据KafkaConsumer主要方法
pollOnce
来跟踪client上的coordinator工作过程
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.ensureCoordinatorReady();
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
long now = time.milliseconds();
client.executeDelayedTasks(now);
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}
第一步、投石问路——确保server端有可用的coordinator
public void ensureCoordinatorReady() { |
如果请求有broker响应了,那么将将该节点做为coordinator:this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
第二步、确保group是可用的
首先,对group需要reJoin的情况进行梳理:
- 有consumer离开当前group,client会发送一个
LeaveGroupRequest
如: - 不再订阅某个topic
- ConsumerCoordinator执行关闭操作
- 发送SyncGroupRequest后收到的response异常
- 发送HeartbeatRequest后收到的response异常,包括:
REBALANCE_IN_PROGRESS
(正在重平衡),ILLEGAL_GENERATION
(generation值不合法),UNKNOWN_MEMBER_ID
(未知的成员)public void ensureActiveGroup() {
if (!needRejoin()) return;
//如果设置了auto commit,那么在rebalance之前先提交,再准备reJoin
if (needsJoinPrepare) {
onJoinPrepare(generation, memberId);
needsJoinPrepare = false;
}
while (needRejoin()) {
ensureCoordinatorReady();
//在reblance执行之前,需要确保所有JoinGroup的请求都被处理掉了,避免频繁的reblance
if (client.pendingRequestCount(this.coordinator) > 0) {
client.awaitPendingRequests(this.coordinator);
continue;
}
RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
future.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
onJoinComplete(generation, memberId, protocol, value);
needsJoinPrepare = true;
heartbeatTask.reset();
}
});
client.poll(future);
}
}
JoinGroupRequest中包含的信息有:
- groupId
- memberId
- subscriptions && PartitionAssignor(默认:RangeAssignor)
第三步、处理JoinGroupResponse
这是通过回调函数实现的,具体的是JoinGroupResponseHandler
的handle方法:public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
//记录新的generation
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;
//leader与follower区别对待
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
} else if {
...
}
}
server的coordinator在收到joinGroupRequest后,会为每个group组选择一个member任命为leader。
leader在收到response后,会进行partition的分配,并且将分配结果发送给server的coordinatorprivate RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
return sendSyncGroupRequest(request);
}
分区分配的逻辑:protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
//获取分配规则(默认range)
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
//获取订阅的topic
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
//对每个订阅的topic进行分区分配
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}
return groupAssignment;
}
follower只需要发送一个不包含分区结果的SyncGroupRequestprivate RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
memberId, Collections.<String, ByteBuffer>emptyMap());
return sendSyncGroupRequest(request);
}
关闭KafkaListnerContainer时发生了些什么
一共三个操作:if(this.listenerInvokerFuture != null) {
this.stopInvokerAndCommitManualAcks();
}
try {
this.consumer.unsubscribe();
} catch (WakeupException var8) {
;
}
this.consumer.close();
主要看unsubscribe方法:this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
- 首先取消了所有订阅
- 然后发送一个LeaveGroupRequest,并且将memberId设为
UNKNOWN
,needRejoin设为true
当所有member离开时,server的coordinator进入
Empty
状态
小实验
创建两个ListenerContainer,订阅同一个topic(“test” with partitions=6),并且在同一个group内:ConcurrentMessageListenerContainer testContainer = new ContainerBuilder()
// .setTopic("kafka.scanAllBackup")
.setTopic("test")
.setGroupId("g1")
.setListenerName("testListener")
.setBeanName("testContainer")
...
ConcurrentMessageListenerContainer secondContainer = new ContainerBuilder()
// .setTopic("kafka.scanAllBackup")
.setTopic("test")
.setGroupId("g1")
.setListenerName("testListener")
.setBeanName("secondContainer")
实验结果
开启第一个Container
此时的generation = 1,分配的分区数为6
开启第二个Container
因为有新的member加入,因此触发了Rebalance,根据Range分配规则,每个consumer获得3个分区
简述下Range Assignor规则:假如topic有N个分区(按number排序),group组内有M个consumer(按字典序排列)订阅,那么就现将分区分成M份,每份N/M个,如果不能整除,就将余数(N%M)分配给前N%M个consumer
关闭第一个Container
有member离开group,再次触发Reblance,第二个container独享6个分区
实验二
在使用kafka的时候有个现象:如果将正在消费的consumer关闭、重启,那么在短时间内他是无法接收到消息的,从日志上看得话就是server coordinator没有为这个consumer分配分区,为了详解这种机制,我将发送JoinGroup的debug信息输出。测试用例同上
开启第一个container
在6分09秒发送了一个joinGroup请求,但是并没有得到反馈。
g1这个group在server中的generation=1,因此新的joinGroup请求进来后,server进入
PreparingRebalance
状态。
开启第二个Container,关闭第一个container
发送了第二个joinGroup请求,并没有马上收到反馈,在6分29秒关闭了第一个container,经过了96s后,收到了反馈,并且指定leader为client2(也就是当前的consumer),client1是member,成功分配到了3个分区。
在10分06秒的时候,server再次执行了重平衡,client2再次发送了joinGroup请求,马上得到了反馈,并且这次独享6个分区。
产生这次重平衡的原因是:8分05秒server反馈了joinGroup请求,session_timeout设置的是两分钟,在10分05秒的时候,server依然未收到client1的heartbeat,因此触发了重平衡
server.log
[2017-03-02 19:06:03,197] INFO [GroupCoordinator 2]: Stabilized group g1 generation 1 (kafka.coordinator.GroupCoordinator) |
- 6分03秒,server收到来自leader的syncGroup请求,coordinator进入
Stable
状态。 - 随后,非正常关闭container,
所有member离开了group,coordinator进入coordinator无法收到members的心跳Empty
状态 - 6分10秒,client重启,并发送了joinGroup请求,memberId为
UNKNOWN
,server进入PreparingRebalance
状态
疑点:
coordinator在收到client1的leaveGroup请求后为啥还会响应其joinGroup请求嘞?coordinator因为没有感知到client1的离开,所以才会长时间等待